package com.cwm.frame4.service.impl;

import com.aliyun.mns.client.*;
import com.aliyun.mns.model.*;
import com.cwm.common.message.*;
import com.cwm.frame4.service.MessageService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.env.Environment;
import org.springframework.stereotype.Service;

import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;

/**
 * Message Service Implementation
 *
 * @author Bill
 * @version 1.0
 * @since 2021-11-26
 */
@Slf4j
@Service
public class MessageServiceImpl implements MessageService {

    public static final int MAX_TOPICS = 100;
    public static final int WAIT_SECONDS = 5;
    public static final long QUEUE_MAX_MESSAGE_SIZE = 2048L;

    private final String profile;
    private final MessagingTopicProperty messagingTopicProperty;
    private final String topicMail;

    private final CloudTopic topicMailAC;

    private final MNSClient mnsClient;

    private final ObjectMapper objectMapper;

    public MessageServiceImpl(
            AliyunProperty aliyunProperty,
            Environment env,
            MessagingTopicProperty messagingTopicProperty,
            ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
        this.messagingTopicProperty = messagingTopicProperty;
        profile = env.getProperty("SPRING_PROFILES_ACTIVE");
        topicMail = getTopicName(messagingTopicProperty.getMail());

        MNSProperty mns = aliyunProperty.getMns();
        CloudAccount account = new CloudAccount(
                aliyunProperty.getAccessKeyId(), aliyunProperty.getAccessKeySecret(), mns.getAccountendpoint());
        mnsClient = account.getMNSClient();

        Set<String> topicNames = listTopics();

        QueueMeta queueMetaTemplate = new QueueMeta();
        queueMetaTemplate.setPollingWaitSeconds(WAIT_SECONDS);
        queueMetaTemplate.setMaxMessageSize(QUEUE_MAX_MESSAGE_SIZE);

        if (topicNames.contains(topicMail)) {
            topicMailAC = mnsClient.getTopicRef(topicMail);
        } else {
            // new create
            TopicMeta topicMeta = new TopicMeta();
            topicMeta.setTopicName(topicMail);

            Vector<String> pushQueueNames = new Vector<>(1);
            pushQueueNames.add(getQueueName(topicMail));

            CloudPullTopic pullTopic = mnsClient.createPullTopic(
                    topicMeta, pushQueueNames, true, queueMetaTemplate);
            topicMailAC = pullTopic.getRawTopic();
        }

    }

    @Override
    public void pubTopicMail(MailMessage mailMessage) {
        try {
            String rawMessage = objectMapper.writeValueAsString(mailMessage);
            pubTopic(topicMailAC, rawMessage, messagingTopicProperty.getMail());
        } catch (JsonProcessingException e) {
            log.error("messaging error", e);
        }
    }

    private void pubTopic(CloudTopic cloudTopic, String rawMessage, String tag) {
        TopicMessage tMessage = new RawTopicMessage();
        tMessage.setMessageTag(tag);
        tMessage.setBaseMessageBody(rawMessage);
        cloudTopic.publishMessage(tMessage);
    }

    private Set<String> listTopics() {
        PagingListResult<TopicMeta> topicMetas =
                mnsClient.listTopic(messagingTopicProperty.getPrefix(), "", MAX_TOPICS);
        if (topicMetas != null) {
            List<TopicMeta> result = topicMetas.getResult();
            return result.stream().map(meta -> meta.getTopicName()).collect(Collectors.toSet());
        }
        return new HashSet<>(0);
    }

    private String getTopicName(String originName) {
        return messagingTopicProperty.getPrefix() + profile + "-" + originName;
    }

    private String getQueueName(String topicName) {
        return topicName + "-queue";
    }

    @Override
    public void close() throws IOException {
        if (mnsClient != null && mnsClient.isOpen()) {
            mnsClient.close();
        }
    }

}
